#include "config.h"

#include <sys/types.h>
#include <unistd.h>
#include <string.h>
#include <errno.h>
#include <dirent.h>
#include <unistd.h>

#define DBG_SUBSYS S_LIBCONTROL

#include "limits.h"
#include "adt.h"
#include "sysy_lib.h"
#include "bmap.h"
#include "net_table.h"
#include "configure.h"
#include "table_proto.h"
#include "volume_proto.h"
#include "metadata.h"
#include "core.h"
#include "md_proto.h"
#include "md_map.h"
#include "net_global.h"
#include "volume_ctl.h"
#include "../storage/stor_rpc.h"
#include "volume.h"
#include "job_dock.h"
#include "ylog.h"
#include "dbg.h"

STATIC int __volume_proto_ec_chunk_write_clone(volume_proto_t *volume_proto, io_opt_t *io_opt,
                                          chunk_io_t *_chunk_io, int chunk_count)
{
        int ret, i, retry = 0;
        chunk_io_t *chunk_io;
        ec_t *ec = &volume_proto->table1.fileinfo.ec;

        for (i = 0; i < chunk_count; i++) {
                chunk_io = &_chunk_io[i];
        retry:
                ret =  volume_proto_chunk_pre_write(volume_proto, io_opt, chunk_io);
                if (unlikely(ret)) {
                        if (ret == ENOENT) {
                        retry1:
                                ret = volume_proto_snapshot_chunk_clone(volume_proto,
                                                                        &chunk_io->io.id, 0);
                                if (unlikely(ret)) {
                                        if (chunk_count > 1) {
                                                USLEEP_RETRY(err_exit, ret, retry1, retry, 10, (1000 * 1000));
                                        } else
                                                GOTO(err_ret, ret);
                                }

                                goto retry;
                        } else {
                                DWARN("chunk "CHKID_FORMAT" %s\n",
                                      CHKID_ARG(&chunk_io->io.id), strerror(ret));
                                GOTO(err_ret, ret);
                        }
                }

                ret = chunk_proto_ec_write(chunk_io->chkinfo, chunk_io->chkstat, chunk_io->vfm,
                                       &chunk_io->io, &chunk_io->buf, ec);
                if (unlikely(ret)) {
                        DWARN("chunk "CHKID_FORMAT" %s\n",
                              CHKID_ARG(&chunk_io->io.id), strerror(ret));
                        GOTO(err_ret, ret);
                }

                ret =  volume_proto_chunk_post_io(volume_proto, &chunk_io->io.id,
                                                chunk_io->chkinfo, chunk_io->chkstat,
                                                chunk_io->io.vclock.clock);
                if (unlikely(ret)) {
                        DWARN("chunk "CHKID_FORMAT" %s\n",
                              CHKID_ARG(&chunk_io->io.id), strerror(ret));
                        GOTO(err_ret, ret);
                }
        }

        return 0;
err_exit:
        DWARN("chunk "CHKID_FORMAT" %s, exit for reset clock\n",
              CHKID_ARG(&chunk_io->io.id), strerror(ret));
        volume_proto->ltime = 0;
        //EXIT(EAGAIN);
err_ret:
        return ret;
}

STATIC int __volume_proto_ec_chunk_pull__(volume_proto_t *volume_proto, chunk_io_t *chunk_io,
                const ec_t *ec, uint32_t off, uint32_t count, buffer_t *strips)
{
        int ret, i;

        ret = volume_proto_chunk_pre_read(volume_proto, chunk_io);
        if (unlikely(ret)) {
                if (ret == ENOENT) {
                        for (i = 0; i < ec->k; i++) {
                                mbuffer_appendzero(&strips[i], count * STRIP_BLOCK);
                        }
                } else {
                        DINFO("chunk "CHKID_FORMAT" %s\n",
                                        CHKID_ARG(&chunk_io->io.id), strerror(ret));
                        GOTO(err_ret, ret);
                }
        } else {
                ret = chunk_proto_ec_write_pull(chunk_io->chkinfo, chunk_io->chkstat,
                                &chunk_io->io, ec, off, count, strips);
                if (unlikely(ret)) {
                        GOTO(err_ret, ret);
                }
        }

        for (i = 0; i < ec->k; i++) {
                YASSERT(strips[i].len == count * STRIP_BLOCK);
        }

        return 0;
err_ret:
        return ret;
}

STATIC int __volume_proto_ec_chunk_commit__(volume_proto_t *volume_proto, io_opt_t *io_opt, chunk_io_t *chunk_io,
                const ec_t *ec, uint32_t off, uint32_t count, buffer_t *strips)
{
        int ret;

        ret =  volume_proto_chunk_pre_write(volume_proto, io_opt, chunk_io);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        ret = chunk_proto_ec_write_commit(chunk_io->chkinfo, chunk_io->chkstat,
                        &chunk_io->io, ec, off, count, strips);
        if (unlikely(ret)) {
                GOTO(err_ret, ret);
        }

        ret =  volume_proto_chunk_post_io(volume_proto, &chunk_io->io.id,
                        chunk_io->chkinfo, chunk_io->chkstat,
                        chunk_io->io.vclock.clock);
        if (unlikely(ret)) {
                GOTO(err_ret, ret);
        }

        return 0;
err_ret:
        return ret;
}

static inline int __is_strip_align(const io_t *io, const ec_t *ec)
{
        return io->offset % STRIP_SIZE(ec) == 0 && io->size % STRIP_SIZE(ec) == 0;
}

STATIC int __volume_proto_ec_chunk_write__(volume_proto_t *volume_proto, io_opt_t *io_opt,chunk_io_t *chunk_io, const ec_t *ec)
{
        int ret;
        uint32_t off = 0, count = 0;
        buffer_t strips[EC_MMAX];
#if ECLOG_ENABLE
        eclog_t *eclog = NULL;
        ecloc_t ecloc;
#endif

        chunk_proto_ec_strips_init(&chunk_io->io, ec, &off, &count, strips);

        ret = __volume_proto_ec_chunk_pull__(volume_proto, chunk_io, ec, off, count, strips);
        if (unlikely(ret)) {
                GOTO(err_ret, ret);
        }

        ret = chunk_proto_ec_write_strip(&chunk_io->io, &chunk_io->buf, ec, off, count, strips);
        if (unlikely(ret)) {
                GOTO(err_ret, ret);
        }

        DBUG("eclog. write info. offset(%lld), len(%d), stripe(%d)\n",
                (LLU)chunk_io->io.offset, chunk_io->io.size,
                STRIP_SIZE(ec));

#if ECLOG_ENABLE
        if (!__is_strip_align(&chunk_io->io, ec)) {
                ret = chunk_proto_eclog_write(volume_proto, &chunk_io->io,
                                ec, off, count, strips, &eclog, &ecloc);
                if (unlikely(ret))
                        GOTO(err_ret, ret);
        }
#endif

        ret = __volume_proto_ec_chunk_commit__(volume_proto, io_opt, chunk_io, ec, off, count, strips);
        if (unlikely(ret))
                GOTO(err_clean, ret);

#if ECLOG_ENABLE
        if (!__is_strip_align(&chunk_io->io, ec)) {
                ret = chunk_proto_eclog_clean(eclog, &ecloc);
                if (unlikely(ret))
                        GOTO(err_ret, ret);
        }
#endif

        chunk_proto_ec_strips_free(strips);
        return 0;
err_clean:
#if ECLOG_ENABLE
        ret = chunk_proto_eclog_clean(eclog, &ecloc);
        if (unlikely(ret))
                GOTO(err_ret, ret);
#endif
err_ret:
        chunk_proto_ec_strips_free(strips);
        return ret;

}

STATIC int __volume_proto_ec_chunk_write(volume_proto_t *volume_proto, io_opt_t *io_opt,
                                      chunk_io_t *_chunk_io, int chunk_count)
{
        int ret, i;
        chunk_io_t *chunk_io;
        ec_t *ec;
        table2_t *table2;

        io_opt->flag |= IO_OPT_CREAT;
        table2 = &volume_proto->table2;
        ec = &volume_proto->table1.fileinfo.ec;
        for (i = 0; i < chunk_count; i++) {
                chunk_io = &_chunk_io[i];

                /*
                 * ec write process(strip:4k):
                 *   1) read
                 *   2) ec encode
                 *   3) write
                 *
                 * if 2 write ios at the same time:
                 *    io1: off 0  size 2K data:bbbb
                 *    io2: off 2K size 2K data:cccc
                 *
                 *     io1  io2
                 *    /   \/   \
                 *    -----------
                 *    |aaaa|aaaa|
                 *    -----------
                 *
                 * io1:  read(aaaa aaaa) -> encode(bbbb aaaa) -> write
                 * io2:  read(aaaa aaaa) -> encode(aaaa cccc) -> write
                 * no mater which one io write first, the second io must overwrite first io, so lock!
                 *
                 */

                ret = table2_ec_wrlock(table2, &chunk_io->io.id);
                if (unlikely(ret))
                        GOTO(err_ret, ret);

                ret = __volume_proto_ec_chunk_write__(volume_proto, io_opt, chunk_io, ec);
                if (unlikely(ret)) {
                        DWARN("chunk "CHKID_FORMAT" %s\n",
                              CHKID_ARG(&chunk_io->io.id), strerror(ret));

                        volume_proto_chunk_reset(volume_proto, &chunk_io->io.id);
                        GOTO(err_lock, ret);
                }

                table2_ec_unlock(table2, &chunk_io->io.id);
        }

        return 0;
err_lock:
        table2_ec_unlock(table2, &chunk_io->io.id);
err_ret:
        return ret;
}

int volume_proto_ec_write(volume_proto_t *volume_proto, io_opt_t *io_opt, const io_t *io, const buffer_t *buf)
{
        int ret, chunk_count = 0, i;
        chunk_io_t *chunk_io, *tmp;

        if (unlikely(volume_proto->table1.fileinfo.attr & __FILE_ATTR_SNAPSHOT__)) {
                ret = EPERM;
                GOTO(err_ret, ret);
        }

        DBUG("write file "CHKID_FORMAT" size %llu off %llu\n",
             CHKID_ARG(&volume_proto->chkid), (LLU)io->size, (LLU)io->offset);

        ANALYSIS_BEGIN(0);

        if (unlikely(io->size == 0)) {
                goto out;
        }

#ifdef HAVE_STATIC_ASSERT
        static_assert(sizeof(*chunk_io)  * LICH_SPLIT_MAX  < MEM_CACHE_SIZE8K, "split");
#endif

        chunk_io = mem_cache_calloc(MEM_CACHE_8K, 1);

        ret = volume_proto_io_split(volume_proto, io,  (buffer_t *)buf,
                                      chunk_io, &chunk_count);
        if (unlikely(ret)) {
                GOTO(err_split, ret);
        }

        if (unlikely(volume_proto->table1.fileinfo.attr & __FILE_ATTR_SNAPSHOT__)) {
                YASSERT(0);
        } else if (unlikely(volume_proto->table1.fileinfo.attr & __FILE_ATTR_CLONE__)) {
                ret = __volume_proto_ec_chunk_write_clone(volume_proto, io_opt, chunk_io, chunk_count);
                if (unlikely(ret))
                        GOTO(err_split, ret);
        } else {
                ret = __volume_proto_ec_chunk_write(volume_proto, io_opt, chunk_io, chunk_count);
                if (unlikely(ret))
                        GOTO(err_split, ret);
        }

        for (i = 0; i < chunk_count; i++) {
                tmp = &chunk_io[i];
                YASSERT(tmp->buf.len);
                mbuffer_merge((buffer_t *)buf, &tmp->buf);
        }

        ANALYSIS_QUEUE(0, 1000 * 1000, "volume_write");

        mem_cache_free(MEM_CACHE_8K, chunk_io);

out:
        return 0;
err_split:
        for (i = 0; i < chunk_count; i++) {
                tmp = &chunk_io[i];
                YASSERT(tmp->buf.len);
                mbuffer_merge((buffer_t *)buf, &tmp->buf);
        }

        mem_cache_free(MEM_CACHE_8K, chunk_io);
err_ret:
        return ret;
}

STATIC int __volume_proto_ec_chunk_read_snapshot(volume_proto_t *volume_proto, chunk_io_t *_chunk_io,
                                            int chunk_count, buffer_t *buf)
{
        int ret, i;
        chunk_io_t *chunk_io;
        ec_t *ec = &volume_proto->table1.fileinfo.ec;

        for (i = 0; i < chunk_count; i++) {
                chunk_io = &_chunk_io[i];

                ret =  volume_proto_chunk_pre_read(volume_proto, chunk_io);
                if (unlikely(ret)) {
                        if (ret == ENOENT) {
                                ret = ENOKEY;
                                GOTO(err_ret, ret);
                        } else {
                                DINFO("chunk "CHKID_FORMAT" %s\n",
                                      CHKID_ARG(&chunk_io->io.id), strerror(ret));
                                GOTO(err_ret, ret);
                        }
                }

                ret = chunk_proto_ec_read(chunk_io->chkinfo, chunk_io->chkstat, chunk_io->vfm,
                                       &chunk_io->io, &chunk_io->buf, ec);
                if (unlikely(ret)) {
                        DINFO("chunk "CHKID_FORMAT" %s\n",
                              CHKID_ARG(&chunk_io->io.id), strerror(ret));
                        GOTO(err_ret, ret);
                }
        }

        for (i = 0; i < chunk_count; i++) {
                chunk_io = &_chunk_io[i];
                mbuffer_merge(buf, &chunk_io->buf);
        }

        return 0;
err_ret:
        for (i = 0; i < chunk_count; i++) {
                chunk_io = &_chunk_io[i];
                mbuffer_free(&chunk_io->buf);
        }
        return ret;
}

STATIC int __volume_proto_ec_chunk_read_clone(volume_proto_t *volume_proto, chunk_io_t *_chunk_io,
                                         int chunk_count, buffer_t *buf)
{
        int ret, i;
        chunk_io_t *chunk_io;
        ec_t *ec = &volume_proto->table1.fileinfo.ec;

        for (i = 0; i < chunk_count; i++) {
                chunk_io = &_chunk_io[i];

                ret =  volume_proto_chunk_pre_read(volume_proto, chunk_io);
                if (unlikely(ret)) {
                        if (ret == ENOENT) {
                                ret = volume_proto_snapshot_chunk_redirect(volume_proto,
                                                                           &chunk_io->io.id,
                                                                           &chunk_io->buf,
                                                                           chunk_io->io.size,
                                                                           chunk_io->io.offset,
                                                                           TRUE);
                                if (unlikely(ret))
                                        GOTO(err_ret, ret);

                                continue;
                        } else {
                                DINFO("chunk "CHKID_FORMAT" %s\n",
                                      CHKID_ARG(&chunk_io->io.id), strerror(ret));
                                GOTO(err_ret, ret);
                        }
                }

                ret = chunk_proto_ec_read(chunk_io->chkinfo, chunk_io->chkstat, chunk_io->vfm,
                                       &chunk_io->io, &chunk_io->buf, ec);
                if (unlikely(ret)) {
                        DINFO("chunk "CHKID_FORMAT" %s\n",
                              CHKID_ARG(&chunk_io->io.id), strerror(ret));
                        GOTO(err_ret, ret);
                }
        }

        for (i = 0; i < chunk_count; i++) {
                chunk_io = &_chunk_io[i];
                mbuffer_merge(buf, &chunk_io->buf);
        }

        return 0;
err_ret:
        for (i = 0; i < chunk_count; i++) {
                chunk_io = &_chunk_io[i];
                mbuffer_free(&chunk_io->buf);
        }
        return ret;
}

STATIC int __volume_proto_ec_chunk_read(volume_proto_t *volume_proto, chunk_io_t *_chunk_io,
                                     int chunk_count, buffer_t *buf)
{
        int ret, i;
        chunk_io_t *chunk_io;
        ec_t *ec = &volume_proto->table1.fileinfo.ec;

        DBUG("ec chunk read. ioinfo chunnkid(id(%llu), type(%u), idx(%u)), clock(%llu), "
                "offset(%llu), size(%u), flags(%u), lsn(%llu), chkinfo chunkdi(id(%llu), "
                "type(%u), idx(%u)), magic(%u), repnum(%u), mtime(%u), infover(%llu), "
                "snapver(%llu), chkstat_clock(%llu), read(%u), write(%u), chunkcount(%u)\n",
                (LLU)_chunk_io->io.id.id, _chunk_io->io.id.type, _chunk_io->io.id.idx,
                (LLU)_chunk_io->io.vclock.clock, (LLU)_chunk_io->io.offset, _chunk_io->io.size,
                _chunk_io->io.flags, (LLU)_chunk_io->io.lsn, (LLU)_chunk_io->chkinfo->id.id,
                _chunk_io->chkinfo->id.type, _chunk_io->chkinfo->id.idx, _chunk_io->chkinfo->magic,
                _chunk_io->chkinfo->repnum, _chunk_io->chkinfo->mtime, 
                (LLU)_chunk_io->chkinfo->info_version, (LLU)_chunk_io->chkinfo->snap_version,
                (LLU)_chunk_io->chkstat->chkstat_clock, _chunk_io->chkstat->read,
                _chunk_io->chkstat->write, chunk_count);

        for (i = 0; i < chunk_count; i++) {
                chunk_io = &_chunk_io[i];

                ret = volume_proto_chunk_pre_read(volume_proto, chunk_io);
                if (unlikely(ret)) {
                        if (ret == ENOENT) {
                                mbuffer_appendzero(&chunk_io->buf, chunk_io->io.size);
                                continue;
                        } else {
                                DINFO("chunk "CHKID_FORMAT" %s\n",
                                      CHKID_ARG(&chunk_io->io.id), strerror(ret));

                                volume_proto_chunk_reset(volume_proto, &chunk_io->io.id);
                                GOTO(err_ret, ret);
                        }
                }

                ret = chunk_proto_ec_read(chunk_io->chkinfo, chunk_io->chkstat, chunk_io->vfm,
                                       &chunk_io->io, &chunk_io->buf, ec);
                if (unlikely(ret)) {
                        DINFO("chunk "CHKID_FORMAT" %s\n",
                              CHKID_ARG(&chunk_io->io.id), strerror(ret));
                        GOTO(err_ret, ret);
                }
        }

        for (i = 0; i < chunk_count; i++) {
                chunk_io = &_chunk_io[i];
                mbuffer_merge(buf, &chunk_io->buf);
        }

        return 0;
err_ret:
        for (i = 0; i < chunk_count; i++) {
                chunk_io = &_chunk_io[i];
                mbuffer_free(&chunk_io->buf);
        }
        return ret;
}

int volume_proto_ec_read(volume_proto_t *volume_proto, const io_t *io, buffer_t *buf)
{
        int ret, chunk_count;
        chunk_io_t *chunk_io;//[LICH_SPLIT_MAX];

        DBUG("read file "CHKID_FORMAT" size %llu off %llu\n",
              CHKID_ARG(&volume_proto->chkid), (LLU)io->size, (LLU)io->offset);

        YASSERT(volume_proto->chkid.id = io->id.id);
        if (unlikely(io->size == 0)) {
                goto out;
        }

#ifdef HAVE_STATIC_ASSERT
        static_assert(sizeof(*chunk_io)  * LICH_SPLIT_MAX  < MEM_CACHE_SIZE8K, "split");
#endif

        chunk_io = mem_cache_calloc(MEM_CACHE_8K, 1);

        ANALYSIS_BEGIN(0);

        ret = volume_proto_io_split(volume_proto, io, NULL,
                                      chunk_io, &chunk_count);
        if (unlikely(ret)) {
                GOTO(err_ret, ret);
        }

        if (volume_proto->table1.fileinfo.attr & __FILE_ATTR_SNAPSHOT__) {
                ret = __volume_proto_ec_chunk_read_snapshot(volume_proto, chunk_io, chunk_count, buf);
                if (unlikely(ret))
                        GOTO(err_ret, ret);
        } else if (volume_proto->table1.fileinfo.attr & __FILE_ATTR_CLONE__) {
                ret = __volume_proto_ec_chunk_read_clone(volume_proto, chunk_io, chunk_count, buf);
                if (unlikely(ret))
                        GOTO(err_ret, ret);
        } else {
                ret = __volume_proto_ec_chunk_read(volume_proto, chunk_io, chunk_count, buf);
                if (unlikely(ret))
                        GOTO(err_ret, ret);
        }

        ANALYSIS_QUEUE(0, 1000 * 1000, "volume_read");

        mem_cache_free(MEM_CACHE_8K, chunk_io);

out:
        return 0;
err_ret:
        mem_cache_free(MEM_CACHE_8K, chunk_io);
        return ret;        
}
